/*

 * Licensed to the Apache Software Foundation (ASF) under one

 * or more contributor license agreements.  See the NOTICE file

 * distributed with this work for additional information

 * regarding copyright ownership.  The ASF licenses this file

 * to you under the Apache License, Version 2.0 (the

 * "License"); you may not use this file except in compliance

 * with the License.  You may obtain a copy of the License at

 *

 *     http://www.apache.org/licenses/LICENSE-2.0

 *

 * Unless required by applicable law or agreed to in writing, software

 * distributed under the License is distributed on an "AS IS" BASIS,

 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

 * See the License for the specific language governing permissions and

 * limitations under the License.

 */

package com.bff.gaia.unified.sdk.io.kafka;



import com.bff.gaia.unified.sdk.transforms.SerializableFunction;

import com.bff.gaia.unified.sdk.transforms.windowing.BoundedWindow;

import org.apache.kafka.common.TopicPartition;

import org.joda.time.Duration;

import org.joda.time.Instant;



import java.io.Serializable;

import java.util.Optional;



import static com.bff.gaia.unified.vendor.guava.com.google.common.base.Preconditions.checkArgument;



/**

 * An extendable factory to create a {@link TimestampPolicy} for each partition at runtime by

 * KafkaIO reader. Subclasses implement {@link #createTimestampPolicy}, which is invoked by the the

 * reader while starting or resuming from a checkpoint. Two commonly used policies are provided. See

 * {@link #withLogAppendTime()} and {@link #withProcessingTime()}.

 */

@FunctionalInterface

public interface TimestampPolicyFactory<KeyT, ValueT> extends Serializable {



  /**

   * Creates a TimestampPolicy for a partition. This is invoked by the reader at the start or while

   * resuming from previous checkpoint.

   *

   * @param tp The returned policy applies to records from this {@link TopicPartition}.

   * @param previousWatermark The latest check-pointed watermark. This is set when the reader is

   *     resuming from a checkpoint. This is a good value to return by implementations of {@link

   *     TimestampPolicy#getWatermark(TimestampPolicy.PartitionContext)} until a better watermark can be established

   *     as more records are read.

   */

  TimestampPolicy<KeyT, ValueT> createTimestampPolicy(

	  TopicPartition tp, Optional<Instant> previousWatermark);



  /**

   * A {@link TimestampPolicy} that assigns processing time to each record. Specifically, this is

   * the timestamp when the record becomes 'current' in the reader. The watermark aways advances to

   * current time.

   */

  static <K, V> TimestampPolicyFactory<K, V> withProcessingTime() {

    return (tp, prev) -> new ProcessingTimePolicy<>();

  }



  /**

   * A {@link TimestampPolicy} that assigns Kafka's log append time (server side ingestion time) to

   * each record. The watermark for each Kafka partition is the timestamp of the last record read.

   * If a partition is idle, the watermark advances roughly to 'current time - 2 seconds'. See

   * {@link KafkaIO.Read#withLogAppendTime()} for longer description.

   */

  static <K, V> TimestampPolicyFactory<K, V> withLogAppendTime() {

    return (tp, previousWatermark) -> new LogAppendTimePolicy<>(previousWatermark);

  }



  /**

   * {@link CustomTimestampPolicyWithLimitedDelay} using {@link KafkaTimestampType#CREATE_TIME} from

   * the record for timestamp. See {@link KafkaIO.Read#withCreateTime(Duration)} for more complete

   * documentation.

   */

  static <K, V> TimestampPolicyFactory<K, V> withCreateTime(Duration maxDelay) {

    SerializableFunction<KafkaRecord<K, V>, Instant> timestampFunction =

        record -> {

          checkArgument(

              record.getTimestampType() == KafkaTimestampType.CREATE_TIME,

              "Kafka record's timestamp is not 'CREATE_TIME' "

                  + "(topic: %s, partition %s, offset %s, timestamp type '%s')",

              record.getTopic(),

              record.getPartition(),

              record.getOffset(),

              record.getTimestampType());

          return new Instant(record.getTimestamp());

        };



    return (tp, previousWatermark) ->

        new CustomTimestampPolicyWithLimitedDelay<>(timestampFunction, maxDelay, previousWatermark);

  }



  /**

   * Used by the Read transform to support old timestamp functions API. This exists only to support

   * other deprecated API {@link KafkaIO.Read#withTimestampFn(SerializableFunction)}.<br>

   * TODO(rangadi): Make this package private or remove it. It was never meant to be public.

   *

   * @deprecated Use @{@link CustomTimestampPolicyWithLimitedDelay}.

   */

  @Deprecated

  static <K, V> TimestampPolicyFactory<K, V> withTimestampFn(

	  final SerializableFunction<KafkaRecord<K, V>, Instant> timestampFn) {

    return (tp, previousWatermark) -> new TimestampFnPolicy<>(timestampFn, previousWatermark);

  }



  /**

   * A simple policy that uses current time for event time and watermark. This should be used when

   * better timestamps like LogAppendTime are not available for a topic.

   */

  class ProcessingTimePolicy<K, V> extends TimestampPolicy<K, V> {



    @Override

    public Instant getTimestampForRecord(PartitionContext context, KafkaRecord<K, V> record) {

      return Instant.now();

    }



    @Override

    public Instant getWatermark(PartitionContext context) {

      return Instant.now();

    }

  }



  /**

   * Assigns Kafka's log append time (server side ingestion time) to each record. The watermark for

   * each Kafka partition is the timestamp of the last record read. If a partition is idle, the

   * watermark advances roughly to 'current time - 2 seconds'. See {@link

   * KafkaIO.Read#withLogAppendTime()} for longer description.

   */

  class LogAppendTimePolicy<K, V> extends TimestampPolicy<K, V> {



    /**

     * When a partition is idle or caught up (i.e. backlog is zero), we advance the watermark to

     * near realtime. Kafka server does not have an API to provide server side current timestamp

     * which could ensure minimum LogAppendTime for future records. The best we could do is to

     * advance the watermark to 'last backlog check time - small delta to account for any internal

     * buffering in Kafka'. Using 2 seconds for this delta. Should this be user configurable?

     */

    private static final Duration IDLE_WATERMARK_DELTA = Duration.standardSeconds(2);



    protected Instant currentWatermark;



    public LogAppendTimePolicy(Optional<Instant> previousWatermark) {

      currentWatermark = previousWatermark.orElse(BoundedWindow.TIMESTAMP_MIN_VALUE);

    }



    @Override

    public Instant getTimestampForRecord(PartitionContext context, KafkaRecord<K, V> record) {

      if (record.getTimestampType().equals(KafkaTimestampType.LOG_APPEND_TIME)) {

        currentWatermark = new Instant(record.getTimestamp());

      } else if (currentWatermark.equals(BoundedWindow.TIMESTAMP_MIN_VALUE)) {

        // This is the first record and it does not have LOG_APPEND_TIME.

        // Most likely the topic is not configured correctly.

        throw new IllegalStateException(

            String.format(

                "LogAppendTimePolicy policy is enabled in reader, but Kafka record's timestamp type "

                    + "is LogAppendTime. Most likely it is not enabled on Kafka for the topic '%s'. "

                    + "Actual timestamp type is '%s'.",

                record.getTopic(), record.getTimestampType()));

      }

      return currentWatermark;

    }



    @Override

    public Instant getWatermark(PartitionContext context) {

      if (context.getMessageBacklog() == 0) {

        // The reader is caught up. May need to advance the watermark.

        Instant idleWatermark = context.getBacklogCheckTime().minus(IDLE_WATERMARK_DELTA);

        if (idleWatermark.isAfter(currentWatermark)) {

          currentWatermark = idleWatermark;

        }

      } // else, there is backlog (or is unknown). Do not advance the watermark.

      return currentWatermark;

    }

  }



  /**

   * Internal policy to support deprecated withTimestampFn API. It returns last record timestamp for

   * watermark!.

   */

  class TimestampFnPolicy<K, V> extends TimestampPolicy<K, V> {



    final SerializableFunction<KafkaRecord<K, V>, Instant> timestampFn;

    Instant lastRecordTimestamp;



    TimestampFnPolicy(

        SerializableFunction<KafkaRecord<K, V>, Instant> timestampFn,

        Optional<Instant> previousWatermark) {

      this.timestampFn = timestampFn;

      lastRecordTimestamp = previousWatermark.orElse(BoundedWindow.TIMESTAMP_MIN_VALUE);

    }



    @Override

    public Instant getTimestampForRecord(PartitionContext context, KafkaRecord<K, V> record) {

      lastRecordTimestamp = timestampFn.apply(record);

      return lastRecordTimestamp;

    }



    @Override

    public Instant getWatermark(PartitionContext context) {

      return lastRecordTimestamp;

    }

  }

}